GCS の Parquet データを BigQuery から参照してみた ~ Parquet データのロードと外部テーブルによる参照~
こんにちは、みかみです。
やりたいこと
- GCS に配置した Parquet データを BigQuery で参照したい
- BigQuery に Parquet データをロードしたい
- BigQuery に Parquet データをロードする場合、意図通りのデータ型を自動検出してテーブル作成してくれるかどうか確認したい
- BigQuery に Parquet データをロードする場合の制限事項を知りたい
- パーティショニングされた Parquet データを BigQuery から参照するにはどうすればいいのか知りたい
Parquet データを準備
以下のサイトで動作確認用の CSV データを作成しました。
できたサンプルデータはこんな感じです。
id,name,kana,postal,tel,mail,password,ip,url,create_timestamp,reg_date,valid,bit,code 1,大和 貫一,ヤマト カンイチ,262-8517,080-6081-7739,pzRz6cx6B@test.org,mp5x_0uQ,150.110.95.198,http://sample.net,1988/2/2 7:09:28,1991/1/28,False,0,2607 2,篠原 茂志,シノハラ シゲシ,025-6473,090-7899-2524,BWeLhiJPN@test.jp,rzFsiiYl,16.169.224.193,http://test.jp,2008/11/15 8:42:14,1972/4/25,True,0,6750 3,近江 智恵子,オウミ チエコ,129-0310,080-6093-0838,qDnzBhS@test.net,QAOoF2NK,161.55.51.87,http://test.com,2016/4/14 18:47:33,2000/8/19,True,1,4208 4,藤原 長治,フヂワラ チョウジ,138-8408,090-7532-4618,CGxHpmaS2@sample.net,uwSE1kHr,171.132.206.6,http://sample.net,2018/11/18 12:18:54,1972/3/4,True,0,8333 5,湯川 秀明,ユカワ ヒデアキ,326-4046,080-1220-3065,IgJh4@example.com,lOuOvfr1,247.118.177.127,http://test.net,2015/11/26 23:13:43,1981/4/2,False,0,3527 6,仁平 章,ヒトヘイ アキラ,142-2086,080-8072-0869,IPMKUcb9J@test.co.jp,FkLBw3NQ,94.42.204.93,http://sample.com,2004/5/27 19:59:07,2012/5/17,True,0,7302 7,芦沢 治,アシザワ オサム,400-1426,080-1336-6892,R2wkyu@sample.net,orBU7WTv,15.158.107.210,http://sample.com,2008/10/28 18:02:33,1973/8/5,False,1,5045 8,甲田 隆之,コウダ タカユキ,001-5573,090-1434-8132,R8kzaAnh@sample.net,PbqK_QQf,84.66.207.170,http://example.net,2015/7/10 15:22:05,1972/11/4,False,0,3327 9,萩原 佳奈,ハギワラ ヨシナ,979-5021,090-8108-0632,XZcZdl0@test.co.jp,AR_OdmXO,13.55.171.197,http://example.com,2015/12/5 12:16:58,1987/3/13,False,1,279 10,大岡 桃香,オオオカ モモカ,691-8324,090-2101-2791,BTZSU@example.co.jp,J4rR75pj,11.97.118.39,http://sample.net,1993/1/25 8:29:46,2009/4/4,True,0,5110
※データ型やマルチバイト文字確認のために個人情報っぽい CSV になりましたが、全てツールで作成した疑似データです。
続いて、以下の Python コードで CSV → Parquet に変換しました。
import argparse import os.path import pandas as pd import pyarrow as pa import pyarrow.parquet as pq parser = argparse.ArgumentParser(description='conv file2parquet') parser.add_argument('file', help='file name') args = parser.parse_args() file_name = os.path.splitext(os.path.basename(args.file))[0] file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), args.file) df = pd.read_csv(file_path) table = pa.Table.from_pandas(df) pq.write_table(table, './{}.parquet'.format(file_name))
Parquet データを BigQuery にロード
作成した Parquet ファイルを GCS に配置して、BigQuery にロードしてみます。
Python クライアントライブラリを使用した以下のコードを実行しました。
from google.cloud import bigquery client = bigquery.Client() dataset_id = 'dataset_2' dataset_ref = client.dataset(dataset_id) job_config = bigquery.LoadJobConfig() job_config.autodetect = True job_config.source_format = bigquery.SourceFormat.PARQUET uri = "gs://test-mikami/data_test/sample.parquet" load_job = client.load_table_from_uri( uri, dataset_ref.table("load_parquet"), job_config=job_config ) # API request print("Starting job {}".format(load_job.job_id)) load_job.result() # Waits for table load to complete. print("Job finished.") destination_table = client.get_table(dataset_ref.table("load_parquet")) print("Loaded {} rows.".format(destination_table.num_rows))
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet.py Starting job d3bf92f2-3b30-4a19-b9b9-ef6ab8f5503e Job finished. Loaded 10 rows.
念のためデータを確認してみます。
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq query --use_legacy_sql=false 'SELECT * FROM `cm-da-mikami-yuki-258308`.dataset_2.load_parquet ORDER BY id' Waiting on bqjob_r6503d3677d28bcbd_00000171a0414e50_1 ... (0s) Current status: DONE +----+-------------+-------------------+----------+---------------+----------------------+----------+-----------------+--------------------+---------------------+-----------+-------+-----+------+ | id | name | kana | postal | tel | mail | password | ip | url | create_timestamp | reg_date | valid | bit | code | +----+-------------+-------------------+----------+---------------+----------------------+----------+-----------------+--------------------+---------------------+-----------+-------+-----+------+ | 1 | 大和 貫一 | ヤマト カンイチ | 262-8517 | 080-6081-7739 | pzRz6cx6B@test.org | mp5x_0uQ | 150.110.95.198 | http://sample.net | 1988/2/2 7:09:28 | 1991/1/28 | false | 0 | 2607 | | 2 | 篠原 茂志 | シノハラ シゲシ | 025-6473 | 090-7899-2524 | BWeLhiJPN@test.jp | rzFsiiYl | 16.169.224.193 | http://test.jp | 2008/11/15 8:42:14 | 1972/4/25 | true | 0 | 6750 | | 3 | 近江 智恵子 | オウミ チエコ | 129-0310 | 080-6093-0838 | qDnzBhS@test.net | QAOoF2NK | 161.55.51.87 | http://test.com | 2016/4/14 18:47:33 | 2000/8/19 | true | 1 | 4208 | | 4 | 藤原 長治 | フヂワラ チョウジ | 138-8408 | 090-7532-4618 | CGxHpmaS2@sample.net | uwSE1kHr | 171.132.206.6 | http://sample.net | 2018/11/18 12:18:54 | 1972/3/4 | true | 0 | 8333 | | 5 | 湯川 秀明 | ユカワ ヒデアキ | 326-4046 | 080-1220-3065 | IgJh4@example.com | lOuOvfr1 | 247.118.177.127 | http://test.net | 2015/11/26 23:13:43 | 1981/4/2 | false | 0 | 3527 | | 6 | 仁平 章 | ヒトヘイ アキラ | 142-2086 | 080-8072-0869 | IPMKUcb9J@test.co.jp | FkLBw3NQ | 94.42.204.93 | http://sample.com | 2004/5/27 19:59:07 | 2012/5/17 | true | 0 | 7302 | | 7 | 芦沢 治 | アシザワ オサム | 400-1426 | 080-1336-6892 | R2wkyu@sample.net | orBU7WTv | 15.158.107.210 | http://sample.com | 2008/10/28 18:02:33 | 1973/8/5 | false | 1 | 5045 | | 8 | 甲田 隆之 | コウダ タカユキ | 001-5573 | 090-1434-8132 | R8kzaAnh@sample.net | PbqK_QQf | 84.66.207.170 | http://example.net | 2015/7/10 15:22:05 | 1972/11/4 | false | 0 | 3327 | | 9 | 萩原 佳奈 | ハギワラ ヨシナ | 979-5021 | 090-8108-0632 | XZcZdl0@test.co.jp | AR_OdmXO | 13.55.171.197 | http://example.com | 2015/12/5 12:16:58 | 1987/3/13 | false | 1 | 279 | | 10 | 大岡 桃香 | オオオカ モモカ | 691-8324 | 090-2101-2791 | BTZSU@example.co.jp | J4rR75pj | 11.97.118.39 | http://sample.net | 1993/1/25 8:29:46 | 2009/4/4 | true | 0 | 5110 | +----+-------------+-------------------+----------+---------------+----------------------+----------+-----------------+--------------------+---------------------+-----------+-------+-----+------+
正常にロードできています。
テーブルスキーマを確認してみると bool 型は自動判定してくれましたが、TIMESTAMP 型を判定してくれてないようです。。
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq show --schema --format=prettyjson cm-da-mikami-yuki-258308:dataset_2.load_parquet [ { "mode": "NULLABLE", "name": "id", "type": "INTEGER" }, { "mode": "NULLABLE", "name": "name", "type": "STRING" }, { "mode": "NULLABLE", "name": "kana", "type": "STRING" }, { "mode": "NULLABLE", "name": "postal", "type": "STRING" }, { "mode": "NULLABLE", "name": "tel", "type": "STRING" }, { "mode": "NULLABLE", "name": "mail", "type": "STRING" }, { "mode": "NULLABLE", "name": "password", "type": "STRING" }, { "mode": "NULLABLE", "name": "ip", "type": "STRING" }, { "mode": "NULLABLE", "name": "url", "type": "STRING" }, { "mode": "NULLABLE", "name": "create_timestamp", "type": "STRING" }, { "mode": "NULLABLE", "name": "reg_date", "type": "STRING" }, { "mode": "NULLABLE", "name": "valid", "type": "BOOLEAN" }, { "mode": "NULLABLE", "name": "bit", "type": "INTEGER" }, { "mode": "NULLABLE", "name": "code", "type": "INTEGER" } ]
日付フォーマットの問題かと、元の CSV ファイル日付項目もフォーマットをハイフン( - )区切りに変更してみましたが、一向に TIMESTAMP 型で検出してくれず。。
ドキュメント確認すると、Parquet → BigQuery ロード時の型変換に関する記載がありました。
ということは、Parquet ファイルできちんと型定義ができていなかった?
CSV から Parquet に変換する Python コードに Parquet のデータ型定義を追加して、再度 Parquet ファイルを作成しました。
(省略) column_types = { 'id': int64(), 'name': string(), 'kana': string(), 'postal': string(), 'tel': string(), 'password': string(), 'ip': string(), 'url': string(), 'create_timestamp': timestamp('s'), 'reg_date': timestamp('s'), 'valid': bool_(), 'bit': float32(), 'code': float64(), } (省略)
実行してみます。
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet.py Starting job 5e0aee11-aa45-4e5a-8514-75e64bbff435 Job finished. Loaded 10 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq query --use_legacy_sql=false 'SELECT create_timestamp, reg_date FROM `cm-da-mikami-yuki-258308`.dataset_2.load_parquet_3 ORDER BY id' Waiting on bqjob_r5b022bd287d25855_00000171a0e6b1da_1 ... (0s) Current status: DONE +---------------------+---------------------+ | create_timestamp | reg_date | +---------------------+---------------------+ | 1988-02-02 07:09:28 | 1991-01-28 00:00:00 | | 2008-11-15 08:42:14 | 1972-04-25 00:00:00 | | 2016-04-14 18:47:33 | 2000-08-19 00:00:00 | | 2018-11-18 12:18:54 | 1972-03-04 00:00:00 | | 2015-11-26 23:13:43 | 1981-04-02 00:00:00 | | 2004-05-27 19:59:07 | 2012-05-17 00:00:00 | | 2008-10-28 18:02:33 | 1973-08-05 00:00:00 | | 2015-07-10 15:22:05 | 1972-11-04 00:00:00 | | 2015-12-05 12:16:58 | 1987-03-13 00:00:00 | | 1993-01-25 08:29:46 | 2009-04-04 00:00:00 | +---------------------+---------------------+ (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq show --schema --format=prettyjson cm-da-mikami-yuki-258308:dataset_2.load_parquet_3 [ { "mode": "NULLABLE", "name": "id", "type": "INTEGER" }, { "mode": "NULLABLE", "name": "name", "type": "STRING" }, { "mode": "NULLABLE", "name": "kana", "type": "STRING" }, { "mode": "NULLABLE", "name": "postal", "type": "STRING" }, { "mode": "NULLABLE", "name": "tel", "type": "STRING" }, { "mode": "NULLABLE", "name": "mail", "type": "STRING" }, { "mode": "NULLABLE", "name": "password", "type": "STRING" }, { "mode": "NULLABLE", "name": "ip", "type": "STRING" }, { "mode": "NULLABLE", "name": "url", "type": "STRING" }, { "mode": "NULLABLE", "name": "create_timestamp", "type": "TIMESTAMP" }, { "mode": "NULLABLE", "name": "reg_date", "type": "TIMESTAMP" }, { "mode": "NULLABLE", "name": "valid", "type": "BOOLEAN" }, { "mode": "NULLABLE", "name": "bit", "type": "FLOAT" }, { "mode": "NULLABLE", "name": "code", "type": "FLOAT" } ]
今度は、きちんと Parquet で定義したデータ型通りにテーブルスキーマが自動作成されました。
圧縮した Parquet データを BigQuery にロード
ドキュメントによると、Parquet データでは、以下の圧縮形式をサポートしているとのことです。
- Snappy
- GZip
- LZO_1C and LZO_1X
SNAPPY、GZIP、LZ4、ZSTD 4つの圧縮形式のファイルを準備しました。
それぞれ、BigQuery にロードしてみます。
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py SNAPPY Starting job d07720c7-107d-43bd-b08e-ff40caddca53 Job finished. Loaded 10 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py GZIP Starting job 558c2d7d-4970-4591-b787-3442b811106c Job finished. Loaded 10 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py LZ4 Starting job a6406969-5e65-4ec7-a932-0488ea8d7368 Job finished. Loaded 10 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py ZSTD Starting job 6bd4a1a4-705a-423e-9ea0-cc6ab258735b Job finished. Loaded 10 rows.
LZ4 と ZSTD はドキュメントには記載がなかった形式ですが、ロード成功したようです。
データ内容も確認しましたが、問題なくロードできていました。
ドキュメントに記載のない圧縮形式でもロードはできましたが、データ型のバリエーションなど十分なサンプルデータではないので、サポート明記のある圧縮形式データを使用した方が無難でしょうか。
CSV と Parquet データロード時間を比較
体感的に、Parquet データのロードの場合、CSV や JSON データをロードした時よりも時間がかかっているように感じました。
ロードファイルが CSV の場合と、圧縮なしの Parquet の場合、SNAPPY 圧縮の Parquet の場合で、ロード処理処理時間にどのくらい差分があるのか確認してみます。
データロード用の Python コードを、テーブルがない場合は新規作成、ある場合は追記モードでロードするよう変更し、ロード完了のタイミングで処理時間を出力するよう修正しました。
from google.cloud import bigquery import time t1 = time.time() (省略) load_job = client.load_table_from_uri( uri, dataset_ref.table("load_csv"), job_config=job_config ) # API request print("Starting job {}".format(load_job.job_id)) load_job.result() # Waits for table load to complete. print("Job finished. -> {} sec".format(time.time()-t1)) destination_table = client.get_table(dataset_ref.table("load_csv")) print("Loaded {} rows.".format(destination_table.num_rows))
まずは CSV ファイルをロードします。
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py Starting job 13369eaa-2784-4b52-8037-32ffccd9a883 Job finished. -> 1.900064468383789 sec Loaded 10 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py Starting job 0b222a3e-306a-4b1d-9622-6fd9af32d3d6 Job finished. -> 2.299731731414795 sec Loaded 20 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py Starting job 7562ade7-a855-4347-bf4e-9221526df9ca Job finished. -> 1.862321376800537 sec Loaded 30 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py Starting job 3363bb79-65a5-4d0b-831b-17bb560c377c Job finished. -> 1.6943325996398926 sec Loaded 40 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py Starting job 77b740d4-4c8a-46fb-8acd-3bcc5bb89c48 Job finished. -> 1.9005563259124756 sec Loaded 50 rows.
クライアントライブラリ経由なので API 通信が必要になり、ネットワーク速度の影響もあるため純粋にデータロード時間だけというわけではありませんが、CSV ファイルの場合、ロード時間はだいたい 2 秒ほどでした。
次に、圧縮なしの Parquet ファイルを同様にロードしてみます。
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py Starting job 0426ca2b-e9f2-4fe0-a90d-a804b4272d01 Job finished. -> 8.229813814163208 sec Loaded 10 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py Starting job 7837e5a7-fe70-43cf-8538-75740b48be0b Job finished. -> 8.890910625457764 sec Loaded 20 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py Starting job b8a69558-b8e6-4ef8-b783-f0bf5b24d8c1 Job finished. -> 14.763366937637329 sec Loaded 30 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py Starting job ef34757c-af56-47e3-aa5e-58ed0282f879 Job finished. -> 6.311228036880493 sec Loaded 40 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py Starting job de632b3b-e65c-4c48-bbdf-20d964becc99 Job finished. -> 9.726810693740845 sec Loaded 50 rows.
API通信の問題か外れ値はあるものの、約 8 秒ほどかかっています。。
最後に、SNAPPY 圧縮の Parquet データです。
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py Starting job 0a9c59a2-5b37-440f-a5c1-402fa37c29e0 Job finished. -> 11.509429454803467 sec Loaded 10 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py Starting job 72733262-a77c-445f-810b-374169abf989 Job finished. -> 6.40911602973938 sec Loaded 20 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py Starting job 091a6961-3420-4cf6-be80-3da06206522f Job finished. -> 6.870298385620117 sec Loaded 30 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py Starting job 8c3061ca-d7b6-4248-8b7f-e24dcba9496f Job finished. -> 10.682614088058472 sec Loaded 40 rows. (test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py Starting job a12066af-c910-4dbb-97be-38a46f3e9fc2 Job finished. -> 11.705443859100342 sec Loaded 50 rows.
圧縮なしの Parquet よりも、1 秒くらい多く時間がかかっているようです。。
列指向の BigQuery なので、同じく列指向フォーマットの Parquet の方がロード処理が速いかと思いましたが、データロード時にはおそらく型変換などのために一度ファイルデータをパースする必要があるため、シンプルなフォーマットのソースデータファイルの方がより高速に処理できるようです。
GCS 上でパティショニングされた Parquet ファイルをロード
Hive や Spark で使用される Parquet ファイルは、ディレクトリパスに年、月、日などの指定を含むパーティショニング状態で配置されることが多いと思います。
BigQuery へのデータロードでは、Parquet に限らず、パーティショニングデータのロードをオプションで指定することができます。
以下の、hive_partitioning オプション指定のある Python コードで Parquet データをロードしてみます。
※2020/04 現在、Python クライアントライブラリでは hive_partitioning オプションはベータ版とのことです。
(省略) job_config = bigquery.LoadJobConfig() job_config.autodetect = True job_config.source_format = bigquery.SourceFormat.PARQUET opt = bigquery.external_config.HivePartitioningOptions() opt.mode = 'AUTO' opt.source_uri_prefix = 'gs://test-mikami/data_test/ext_parquet/' job_config.hive_partitioning = opt uri = 'gs://test-mikami/data_test/ext_parquet/*' load_job = client.load_table_from_uri( uri, dataset_ref.table("load_parquet_partition"), job_config=job_config ) # API request (省略)
GCS には、/dt=2020-04-21/、/dt=2020-04-22/、/dt=2020-04-23/ の3つのパスの配下に、それぞれ1ファイルずつ配置してある状態です。
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ gsutil ls -r gs://test-mikami/data_test/ext_parquet gs://test-mikami/data_test/ext_parquet/: gs://test-mikami/data_test/ext_parquet/ gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/: gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/ gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/sample_p1.parquet gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/: gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/ gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/sample_p2.parquet gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/: gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/ gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/sample_p3.parquet
実行してみます。
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_partition.py Starting job 1c9fcdf3-89dd-4a43-9323-0c114eb87451 Job finished. Loaded 10 rows.
パス分割されていた全ての配置ファイルのデータがロードされていることが確認できました。
hive_partitioning オプションを指定する場合、Parquet ファイルの配置パスは Hive パーティショニングレイアウトに従う必要があります。
試しに、パスが不正なファイルを追加してみます。
uri_prefix の直下と dt=YYYY-MM-DD を含まないパスの下にも、Parquet ファイルを配置しました。
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ gsutil ls -r gs://test-mikami/data_test/ext_parquet gs://test-mikami/data_test/ext_parquet/: gs://test-mikami/data_test/ext_parquet/ gs://test-mikami/data_test/ext_parquet/sample_p1.parquet gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/: gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/ gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/sample_p1.parquet gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/: gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/ gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/sample_p2.parquet gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/: gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/ gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/sample_p3.parquet gs://test-mikami/data_test/ext_parquet/work/: gs://test-mikami/data_test/ext_parquet/work/ gs://test-mikami/data_test/ext_parquet/work/sample_p1.parquet
ロード実行してみると
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_partition.py Starting job 8bf21632-6f36-432e-aa2b-14290245d773 Traceback (most recent call last): File "load_parquet_partition.py", line 21, in <module> load_job.result() # Waits for table load to complete. File "/home/ec2-user/test_bq/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 818, in result return super(_AsyncJob, self).result(timeout=timeout) File "/home/ec2-user/test_bq/lib/python3.7/site-packages/google/api_core/future/polling.py", line 127, in result raise self._exception google.api_core.exceptions.BadRequest: 400 Partition keys should be invariant from table creation across all partitions, with the number of partition keys held constant with invariant names. Expected 0 partition keys ([]), but 1 ([dt]) were encountered along path /bigstore/test-mikami/data_test/ext_parquet/dt=2020-04-22.
パーティションキーが不正だと怒られました。。
なお、ロード対象のソースデータファイルをワイルドカードで指定すれば、パーティションキーにかかわらず、複数ファイルを一度にロードすることもできます。
(省略) job_config = bigquery.LoadJobConfig() job_config.autodetect = True job_config.source_format = bigquery.SourceFormat.PARQUET #opt = bigquery.external_config.HivePartitioningOptions() #opt.mode = 'AUTO' #opt.source_uri_prefix = 'gs://test-mikami/data_test/ext_parquet/' #job_config.hive_partitioning = opt uri = 'gs://test-mikami/data_test/ext_parquet/*' load_job = client.load_table_from_uri( # uri, dataset_ref.table("load_parquet_partition"), job_config=job_config uri, dataset_ref.table("load_parquet_multi"), job_config=job_config ) # API request (省略)
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_partition.py Starting job 1e93d04f-ead5-4dbe-8b6f-28adae1aef43 Job finished. Loaded 16 rows.
ファイルストレージ上のパーティショニングレイアウトに一致するファイルのみをロードしたい場合は、パーティショニングオプションを指定しておくと良さそうです。
パーティショニングされた Parquet ファイルを参照する外部テーブルを作成
BigQuery では、GCS 上のファイルなどの外部リソースに対して、通常のテーブル同様に SQL クエリを実行することができます。
先ほどは GCS から BigQuery にデータをロードして参照しましたが、GCS にパーティショニングされている Parquet ファイルをロードなしで直接参照する、外部テーブルを定義してみます。
※現時点では、Python クライアントライブラリ経由での Parquet ファイルの外部テーブル作成は未サポートのようです。
BigQuery 管理画面から、「ソースデータパーティショニング」を指定して、「外部テーブル」を作成しました。
SQL を実行してデータを確認してみます。
複数のパスに配置された全ての Parquet ファイルのデータが参照可能なことが確認できました。
外部テーブル作成後に GCS に dt=YYYY-MM-DD フォーマットの新しいパスを作成し、Parquet ファイルを追加して、再度 BigQuery テーブルに SQL クエリを実行してみます。
後から追加した Parquet ファイルのデータも BigQuery の外部テーブルで参照できることが確認できました。
まとめ(所感)
特にデータ分析業務では Parquet ファイルを扱うケースも多々あるかと思います。
BigQuery で外部テーブルを定義しておけば、例えば他のシステムからデイリーで Parquet ファイルが出力される場合など、ロード処理なしに GCS 上のパーティショニングデータを BigQuery のテーブルデータとして参照できるので、便利なのではないかと思いました。